Flink事件时间、水印和迟到数据处理
前言
之前的文章中已经屡次提到过Flink的事件时间(event time)、水印(watermark)、乱序(out-of-order)、迟到数据(late element)这些概念。
本文作者是LittleMagic链接:https://www.jianshu.com/p/c612e95a5028
事件时间与水印
所谓事件时间,就是Flink DataStream中的数据元素自身带有的、在其实际发生时记录的时间戳,具有业务含义,并与系统时间独立。很显然,由于外部系统产生的数据往往不能及时、按序到达Flink系统,所以事件时间比处理时间有更强的不可预测性。为了能够准确地表达事件时间的处理进度,就必须用到水印。
Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。
为了形象地说明水印的作用,参考一下下面的图,是一个乱序的基于事件时间的数据流示例。
图中的方框就是数据元素,其中的数字表示事件时间,W(x)就表示时间戳是x的水印,并有长度为4个时间单位的滚动窗口。假设时间单位为秒,可见事件时间为2、3、1s的元素都会进入区间为[1s, 4s]的窗口,而事件时间为7s的元素会进入区间为[5s, 8s]的窗口。当水印W(4)到达时,表示已经没有t <= 4s的元素了,[1s, 4s]窗口会被触发并计算。同理,水印W(9)到达时,[5s, 8s]窗口会被触发并计算,以此类推。
不过图中暂时没有示出迟到数据。如果事件时间为6的元素出现在W(9)后面,就算是迟到了。迟到数据的处理后面再说。
上面的示例只有一个并行度,那么在有多个并行度的情况下,就会有多个流产生水印,窗口触发时该采用哪个水印呢?答案是所有流入水印中时间戳最小的那个。来自官方文档的图能够说明问题。
容易理解,如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。
提取事件时间、产生水印
上面说了这么多,那么事件时间是如何从数据中提取的,水印又是如何产生的呢?Flink提供了统一的DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印,毕竟它们在处理过程中是紧密联系的。
assignTimestampsAndWatermarks()方法接受的参数类型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(即由事件本身的属性触发)水印,它们的类图如下所示。
public abstract long extractAscendingTimestamp(T element);
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
@Override
public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
@Override
public long extractTimestamp(T element, long previousElementTimestamp) {
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return now;
}
@Override
public Watermark getCurrentWatermark() {
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return new Watermark(now - 1);
}
IngestionTimeExtractor基于当前系统时钟生成时间戳和水印,其实就是Flink三大时间特征里的摄入时间了。
打点水印
打点水印比周期性水印用的要少不少,并且Flink没有内置的实现,那么就写个最简单的栗子吧。
sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {
return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;
}
@Override
public long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {
return element.getTimestamp();
}
});
sourceStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(UserActionRecord record) {
return record.getTimestamp();
}
}
)
.keyBy("platform")
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30))
.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
// ......
// 侧输出的OutputTag
OutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");
sourceStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(UserActionRecord record) {
return record.getTimestamp();
}
}
)
.keyBy("platform")
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30))
.sideOutputLateData(lateOutputTag) // 侧输出
.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
// ......
// 获取迟到数据并写入对应Sink
stream.getSideOutput(lateOutputTag).addSink(lateDataSink);
文章不错?点个【在看】吧! 👇